---
title: "Master NOAA_EPA"
author: "Colby Wilkinson"
date: "2/23/2020"
output: html_document
---

Installing and loading packages needed for data collection

```{r setup, include=FALSE}
knitr::opts_chunk$set(echo = TRUE)
#install.packages("tidyverse", "threadr", "data.table")
#remotes::install_github("skgrange/threadr")
library(tidyverse)
library(threadr)
library(data.table)

dir.create("./output")
```


### Background

This markdown file is used to collect and re-shape NOAA Integrated Suraface Data (ISD) and EPA Air Quality System (AQS) data for Replication Data for: No, Judges Are Not Influenced by Temperature (or Other Weather): Comment (https://dataverse.harvard.edu/dataset.xhtml?persistentId=doi:10.7910/DVN/3LOR3R)

The initial datafiles are saved in the 'initial_data' directory, which include the locations of interest, the ISD station and AQS site information and a crosswalk file for coverting zipcode into FIPS code.  All resulting data files from this script are saved in the 'output' directory.

### Assigning NOAA ISD Station to each location

Reading in the current Integrated Surface Data (ISD) stations, made available by NOAA.  Stations that ended collection after 1989 or do not have a state/territory listed are omitted.

```{r}
isd_stations <- read_csv("./initial_data/isd_stations.csv")%>% 
  mutate(station_zip_code = str_pad(station_zip_code, side = "left", width = 5, pad = "0")) %>% 
  select(station_name, station_state, LAT, LON, BEGIN, END, station_zip_code, station_address, ISD_station, airport_hub) %>% 
  filter(as.numeric(str_sub(END, start = 1, end = 4)) > 1989 & !is.na(station_state)& !is.na(LAT))
```

Loading locations dataset

```{r}
locations <- read_csv("./initial_data/locations.csv")
```


Function to estimate distance from station to location

```{r}
source("./utils/distance.R")
```

Based on exploratory analyses with ISD data, it appears airports have more consistent data availability, and major airports are more consistent than smaller airports.  We have flagged ISD stations that the FAA considers large, medium or small hub airports. Therefore, we will match locations to ISD stations using the following rules

* Filter stations within 20 miles, or within 35 miles is no stations are within 20 miles
* Match to the closest large-hub airport
* If no large-hub airport is available, match to the nearest medium-hub airport.
* If no large or medium-hub airport is available, match to the nearest small hub airport
* If no large, medium or small-hub airport is available, match to the nearest station with the most years of collection within 1990-2019, our period of interest.

```{r}
locations_ISD <- locations %>% 
 merge(isd_stations, all = TRUE) %>% 
  mutate(station_distance = distance(location_lat, location_lon, LAT, LON)) %>% 
  group_by(location_code) %>% 
  filter(station_distance < 20 | (station_distance < 35 & !any(station_distance < 20))) %>% 
  mutate(L_hub = any(airport_hub == "L", na.rm = TRUE),
         M_hub = any(airport_hub == "M", na.rm = TRUE),
         S_hub = any(airport_hub == "S", na.rm = TRUE)) %>% 
  filter((airport_hub == "L" & L_hub) |
         (airport_hub == "M" & !L_hub & M_hub) |
         (airport_hub == "S" & !L_hub & !M_hub & S_hub) |
         (!L_hub & !M_hub & !S_hub)) %>% 
  filter(((L_hub | M_hub | S_hub) & station_distance == min(station_distance)) | !(L_hub | M_hub | S_hub)) %>% 
  mutate(airport_station = (L_hub | M_hub | S_hub)) %>% 
  ungroup() %>% 
  rowwise() %>% 
  mutate(years_collect = sum(c(as.numeric(str_sub(BEGIN, start = 1, end = 4)):as.numeric(str_sub(END, start = 1, end = 4)))  %in% c(1990:2019))) %>% 
  group_by(location_code) %>% 
  filter(years_collect == max(years_collect) | airport_station) %>% 
  filter(station_distance == min(station_distance)) %>% 
  ungroup()
  
```

### Assigning EPA AQS collection site to each location

Loading AQS Site information made available by the EPA: https://aqs.epa.gov/aqsweb/airdata/download_files.html, filtering sites without collection during 1990-2019

```{r}
aqs_sites <- read_csv("./initial_data/aqs_sites.csv") %>% 
  mutate(state_FIPS = str_pad(state_FIPS, width = 2, side = "left", pad = "0"),
         county_FIPS = str_pad(county_FIPS, width = 3, side = "left", pad = "0")) %>% 
  filter(as.numeric(str_sub(`Site Closed Date`, start = -2, end = -1)) > 89 | is.na(`Site Closed Date`)) %>% 
  mutate(year_est = year(mdy(`Site Established Date`)),
         year_est = ifelse(year_est > 2019, year_est - 100, year_est),
         year_closed = year(mdy(`Site Closed Date`)),
         year_closed = ifelse(year_closed > 2019, year_closed - 100, year_closed)) %>% 
  mutate(years_active = ifelse(is.na(`Site Closed Date`), 2019 - year_est, year_closed - 1989)) %>% 
  distinct()
```

AQS sites are matched to locations by state, after adding FIPS state code to each station.  Matching AQS sites to locations within 20 miles, selecting site with most available pm2.5 data, then selecting station with longest collection history.

```{r}
locations_AQS <- locations %>% 
  mutate(zip_code = str_pad(zip_code, 5, "0", side = "left")) %>% 
  left_join(read_csv("./initial_data/zip_fips_match.csv") %>% 
              rename(zip_code = ZIP) %>% 
              mutate_all(as.character) %>% 
              mutate(zip_code = str_pad(zip_code, 5, "0", side = "left"),
                     STCOUNTYFP = str_pad(STCOUNTYFP, 5, "0", side = "left"),
                     state_FIPS = substr(STCOUNTYFP, start = 1, stop = 2)) %>% 
              select(zip_code, state_FIPS),
            by = "zip_code") %>% 
  select(location_code, address, city, state, zip_code, state_FIPS, location_lat, location_lon) %>% 
  mutate(state_FIPS = str_pad(state_FIPS, width = 2, side = "left", pad = "0")) %>% 
  left_join(aqs_sites, by = "state_FIPS") %>% 
  mutate(site_distance = distance(Latitude, Longitude, location_lat, location_lon)) %>% 
  group_by(location_code) %>% 
  filter(site_distance < 20) %>%
  # first selecting AQS site with most PM2.5 data available
  filter(obs_percent == max(obs_percent, na.rm = TRUE) | (!any(obs_percent > 0) | !any(!is.na(obs_percent)))) %>% 
  # next selecting AQS site with longest collection history
  filter(years_active == max(years_active, na.rm = TRUE)) %>% 
  top_n(n = 1, wt = Address) %>% 
  rename(site_city = `City Name`,
         site_county =`County Name`,
         site_state = `State Name`,) %>% 
  mutate(stations = n()) %>% 
  ungroup() %>% 
  distinct()
```

### Downloading ISD data from NOAA

Creating a directory for temporary data storage during collection, a vector of ISD stations, and a list to hold resulting data files

```{r}
dir.create("./isd_data")

ISD_stations <- unique(locations_ISD$ISD_station)

isd_data_list <- list()

```


The function below downloads the ISD data file for the station/year combination and replace missing value indicators (-9999/9999) with NA

```{r}
source("./utils/getISD.R")
```

Below we look over the stations for which we need data, and add the resulting data to the initialized list

```{r, error=FALSE, message=FALSE, warning=FALSE}
for (station in ISD_stations){
  
  # finding beginning/end of station collection
  
  begin <- max(isd_stations_data$BEGIN[isd_stations_data$ISD_station == station], 1990)
  
  end <- isd_stations_data$END[isd_stations_data$ISD_station == station]
  
  for (year in begin:end){
    
    isd_data_list[[paste(station, year, sep = "-")]] <- getISD(year = year, station = station)
    
  }
  
}
```

Creating dataset of station/years for which the station was active, but not dataset was returned.  Another attempt will be made to downlad each dataset to avoid missing data due to server timeouts.

```{r, message=FALSE, warning=FALSE}

isd_stations <- isd_stations %>% 
  filter(ISD_station %in% unique(noaa_isd$station)) %>% 
  select(ISD_station, BEGIN, END) %>% 
  rename(station = ISD_station) %>% 
  mutate(BEGIN = as.numeric(BEGIN),
         END = as.numeric(END),
         BEGIN = ifelse(BEGIN < 1990, 1990, BEGIN))

station_years <- list()

for (isd_station in isd_stations$station){
  
  year = isd_stations$BEGIN[isd_stations$station == isd_station]:isd_stations$END[isd_stations$station == isd_station]
  
  station = rep(isd_station, length(year))
  
  station_years[[isd_station]] <- data.frame(year = year,
                                       station = station)
}

missing_isd <- bind_rows(station_years) %>% 
  left_join(noaa_isd %>% 
              distinct(station, year) %>% 
              mutate(data_available = TRUE), by = c("station", "year")) %>% 
  mutate(data_available = ifelse(is.na(data_available), FALSE, TRUE)) %>% 
  filter(data_available == FALSE) %>% 
  select(-data_available)
```

Attempting to download ISD files for missing years, to avoid missing data due to server errors

```{r}
for (station_year in 1:length(missing_isd$station)){
  
  station = missing_isd$station[station_year]
  
  year = missing_isd$year[station_year]
  
  isd_data_list[[paste(station, year, sep = "-")]] <- getISD(year = year, station = station)
    
}

```

Binding the resulting datasets together

```{r}
noaa_isd <- rbindlist(isd_data_list[!is.na(isd_data_list)])
```


### Downloading AQS data from EPA

Setting up collection: reading in the site information for which we will be collecting data

```{r}
aqs_fips_codes <- locations_AQS %>% 
  distinct(state_FIPS, county_FIPS) %>% 
  mutate(state_FIPS = str_pad(state_FIPS, side = "left", width = 2, pad = "0"),
         county_FIPS = str_pad(county_FIPS, side = "left", width = 3, pad = "0"),
         FIPS_code = paste(state_FIPS, county_FIPS, sep = ""))
```

Parameters: 

* PM2.5 (Micrograms/cubic meter): 88101 and 88502
* Carbon monoxide (CO) (ppm): 42101
* Ozone (O3) (ppm): 44201
* Air Pressure (Millibars): 64101

Setting up directory for temporary data storage, initializing vectors of site FIPS codes, parameter codes and paramter names (used in file naming on the EPA website) and a list to store resulting datasets

```{r}
dir.create("./aqs_data")

FIPS_codes <- aqs_fips_codes$FIPS_code

parameter_codes <- c("88101", "88502", "42101", "44201", "64101")

parameters_epa <- c("44201", "42101", "88101", "88502", "PRESS")

aqs_data_list <- list()
```

The funciton below returns the hourly AQS dataset given a parameter, year, fips code.

```{r}
source("./utils/getHourlyAQS.R")
```

Looping over years 1989-2019, adding resulting dataset to previously initialized list

```{r, error=FALSE, message=FALSE, warning=FALSE}
for (year in 1990:2019){
  
  for (parameter in parameters_epa){
    
    aqs_data_list[[paste(year, parameter, sep = "_")]] <- getHourlyAQS(parameter = parameter,
                                                                 year = year,
                                                                 FIPS_codes = FIPS_codes,
                                                                 parameter_codes = parameter_codes)
  }
}
```

Binding the resulting data frames, removing duplicate rows, reshaping.

```{r}
epa_aqs <-  bind_rows(aqs_data_list)  %>%
  spread(key = parameter, value = value) %>% 
    mutate(pm25 = ifelse(pm25 < -10 | pm25 > 501, NA, pm25),
           pm25_local = ifelse(pm25_local < -10 | pm25_local > 501, NA, pm25_local),
           carbon_monoxide = ifelse(carbon_monoxide < 0 | carbon_monoxide > 70, NA, carbon_monoxide),
           ozone = ifelse(ozone < 0 | ozone > 1, NA, ozone))%>%
    group_by(state_FIPS, county_FIPS, date_gmt, hour_gmt) %>% 
    summarise(pm25 = mean(pm25, na.rm = TRUE),
              pm25_local = mean(pm25_local, na.rm = TRUE),
              carbon_monoxide = mean(carbon_monoxide, na.rm = TRUE),
              ozone = mean(ozone, na.rm = TRUE),
              barometric_pressure = mean(barometric_pressure, na.rm = TRUE)) %>% 
    ungroup()
```

### Combining EPA and NOAA data for master data file

Adding ISD data to location-level information

```{r}
noaa_isd_hourly <- locations_ISD %>% 
  distinct(location_code, ISD_station, zip_code) %>% 
  # joining with ISD data
  left_join(noaa_isd %>% 
              mutate(date_gmt = paste(year,
                                      str_pad(month, side = "left", width = 2, pad = "0"),
                                      str_pad(day, side = "left", width = 2, pad = "0"), sep = "-"),
                     hour_gmt = paste(str_pad(hour, side = "left", width = 2, pad = "0"), ":00", sep = "")) %>% 
              select(-c(year, month, day, hour)) %>% 
              rename(ISD_station = station)  %>% 
              mutate(temp = ifelse(temp < -128.6 | temp > 134, NA, temp),
                     dew = ifelse(dew < -21 | dew > 95, NA, dew),
                     wind_speed = ifelse(wind_speed == 99.9, NA, wind_speed),
                     prec_1 = ifelse(prec_1 == 99.9 | prec_1 > 30.5, NA, prec_1),
                     sky = ifelse(sky %in% c(9:18), NA, sky),
                     sky = ifelse(sky == 1, 1/10, sky),
                     sky = ifelse(sky == 2, 2.5/10, sky),
                     sky = ifelse(sky == 3, 4/10, sky),
                     sky = ifelse(sky == 4, 5/10, sky),
                     sky = ifelse(sky == 5, 6/10, sky),
                     sky = ifelse(sky == 6, 7.5/10, sky),
                     sky = ifelse(sky == 7, 9.5/10, sky),
                     sky = ifelse(sky == 8, 10/10, sky)), by = "ISD_station") %>% 
  group_by(location_code, date_gmt, hour_gmt, zip_code) %>% 
  summarise(prec_noaa = mean(prec_1, na.rm = TRUE),
            temp_noaa = mean(temp, na.rm = TRUE),
            dew_noaa = mean(dew, na.rm = TRUE),
            press_noaa = mean(press, na.rm = TRUE),
            wind_speed_noaa = mean(wind_speed, na.rm = TRUE),
            mean(sky, na.rm = TRUE),
            station_count = n(),
            prec_noaa_count = sum(!is.na(prec_1)),
            temp_noaa_count = sum(!is.na(temp)),
            dew_noaa_count = sum(!is.na(dew)),
            press_noaa_count = sum(!is.na(press)),
            wind_speed_noaa_count = sum(!is.na(wind_speed)),
            sky_noaa_count = sum(!is.na(sky))) %>% 
  ungroup()
  
  
```

Adding AQS data to location-level information

```{r}
epa_aqs_hourly <- locations_AQS %>% 
  distinct(location_code, state_FIPS, county_FIPS, zip_code, site_number) %>% 
  mutate(state_FIPS = str_pad(state_FIPS, side = "left", width = 2, pad = "0"),
         county_FIPS = str_pad(county_FIPS, side = "left", width = 3, pad = "0")) %>% 
  left_join(epa_aqs %>% 
              separate(date_gmt, into = c("year", "month", "day"), sep = "-") %>%
              mutate(date_gmt = paste(year,
                                      str_pad(month, side = "left", width = 2, pad = "0"),
                                      str_pad(day, side = "left", width = 2, pad = "0"), sep = "-")) %>% 
              select(-c(year, month, day)) %>%
              mutate(state_FIPS = str_pad(state_FIPS, side = "left", width = 2, pad = "0"),
                     county_FIPS = str_pad(county_FIPS, side = "left", width = 3, pad = "0")) %>% 
              rename(pm25 = PM2.5,
                     pm25_local = PM2.5_local,
                     carbon_monoxide = CO,
                     barometric_pressure = press_epa)  %>% 
              mutate(pm25 = ifelse(pm25 < -10 | pm25 > 501, NA, pm25),
                     pm25_local = ifelse(pm25_local < -10 | pm25_local > 501, NA, pm25_local),
                     carbon_monoxide = ifelse(carbon_monoxide < 0 | carbon_monoxide > 70, NA, carbon_monoxide),
                     ozone = ifelse(ozone < 0 | ozone > 1, NA, ozone)),
            by = c("state_FIPS", "county_FIPS", "site_number")) %>%
  group_by(location_code, date_gmt, hour_gmt, zip_code) %>% 
  summarise(pm25 = mean(pm25, na.rm = TRUE),
            pm25_local = mean(pm25_local, na.rm = TRUE),
            ozone = mean(ozone, na.rm = TRUE),
            carbon_monoxide = mean(carbon_monoxide, na.rm = TRUE),
            barometric_pressure = mean(barometric_pressure, na.rm = TRUE)) %>% 
  ungroup()
```

Combining AQS and ISD data, cleaning up pressure variable (units conversion, impute missing values across EPA/NOAA data), writing resulting data to csv file.

```{r}
epa_aqs_hourly %>% 
  full_join(noaa_isd_hourly, by = c("location_code", "date_gmt", "hour_gmt", "zip_code")) %>% 
  mutate(pressure = ifelse(is.na(press_noaa), barometric_pressure, press_noaa),
         pressure = ifelse(pressure > 20000, ((1 - (pressure / 145366.45)) ^ (1/0.190284)) * 1013.25, pressure),
         pressure = ifelse(pressure > 0 & pressure < 40, pressure * 33.8639, pressure),
         pressure = ifelse(pressure > 400 & pressure < 900, ((1 - ((pressure * 3.281)/ 145366.45)) ^ (1/0.190284)) * 1013.25, pressure),
         pressure = ifelse(pressure < 870 | pressure > 1084, NA, pressure)) %>%
  select(-c(press_noaa, barometric_pressure)) %>%
  write_csv("./output/epa_noaa_data.csv")
```

### Daily AQS Data Download 

We also collected the daily summary measures for AQS from the EPA.

Parameters: 

* PM2.5 (Micrograms/cubic meter): 88101 and 88502
* Carbon monoxide (CO) (ppm): 42101
* Ozone (O3) (ppm): (44201) 

Setting up directory for temporary data storage, initializing vectors of site FIPS codes, parameter codes and paramter names (used in file naming on the EPA website) and a list to store resulting datasets

```{r}

parameter_codes <- c("88101", "88502", "42101", "44201")

parameters_epa <- c("44201", "42101", "88101", "88502")

aqs_data_list <- list()

```

The funciton below returns the daily AQS dataset given a parameter, year, fips code.

```{r}
source("./utils/getDailyAQS.R")
```

Looping over years 1990-2019, adding resulting dataset to previously initialized list

```{r, error=FALSE, message=FALSE, warning=FALSE}
for (year in 1990:2019){
  
  for (parameter in parameters_epa){
    
    aqs_data_list[[paste(year, parameter, sep = "_")]] <- getDailyAQS(parameter = parameter,
                                                                 year = year,
                                                                 FIPS_codes = FIPS_codes_site,
                                                                 parameter_codes = parameter_codes)
  }
}
```

Binding the resulting data frames, removing duplicate rows, reshaping and saving to a csv file

```{r}
epa_aqs_daily <- bind_rows(aqs_data_list) %>%
  filter(parameter %in% c("PM2.5", "PM2.5_local", "CO", "ozone")) %>%
  group_by(state_FIPS, county_FIPS, date_local, parameter) %>%
  filter(row_number()==1) %>% 
  mutate(value = as.numeric(value)) %>%
  ungroup() %>%
  spread(key = parameter, value = value)
```

### Reshaping Daily AQS data

Adding AQS data to location-level information: cleaning variables, saving to csv

```{r}
locations_AQS %>% 
  distinct(location_code, state_FIPS, county_FIPS, zip_code) %>% 
  mutate(state_FIPS = str_pad(state_FIPS, side = "left", width = 2, pad = "0"),
         county_FIPS = str_pad(county_FIPS, side = "left", width = 3, pad = "0")) %>% 
  left_join(epa_aqs_daily %>% 
              separate(date_local, into = c("year", "month", "day"), sep = "-") %>%
              mutate(date_local = paste(year,
                                      str_pad(month, side = "left", width = 2, pad = "0"),
                                      str_pad(day, side = "left", width = 2, pad = "0"), sep = "-")) %>% 
              select(-c(year, month, day)) %>%
              mutate(state_FIPS = str_pad(state_FIPS, side = "left", width = 2, pad = "0"),
                     county_FIPS = str_pad(county_FIPS, side = "left", width = 3, pad = "0")) %>% 
              rename(pm25 = PM2.5,
                     pm25_local = PM2.5_local,
                     carbon_monoxide = CO)  %>% 
              mutate(pm25 = ifelse(pm25 < -10 | pm25 > 501, NA, pm25),
                     pm25_local = ifelse(pm25_local < -10 | pm25_local > 501, NA, pm25_local),
                     carbon_monoxide = ifelse(carbon_monoxide < 0 | carbon_monoxide > 70, NA, carbon_monoxide),
                     ozone = ifelse(ozone < 0 | ozone > 1, NA, ozone)),
            by = c("state_FIPS", "county_FIPS", "site_number")) %>% 
  write_csv("./output/aqs_daily.csv")
```

